package edu.indiana.d2i.mapreduce;

import java.nio.ByteBuffer;
import java.util.StringTokenizer;

import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

import edu.indiana.d2i.util.hector.HectorHadoopClient;

public class HectorConfigHelper {
	private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class";
	private static final String INPUT_KEYSPACE_CONFIG = "cassandra.input.keyspace";
	private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace";
	private static final String INPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.input.keyspace.username";
	private static final String INPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.input.keyspace.passwd";
	private static final String OUTPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.output.keyspace.username";
	private static final String OUTPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.output.keyspace.passwd";
	private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
	private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
	private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
	private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
	private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
	private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
	private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
	private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
	private static final String THRIFT_PORT = "cassandra.thrift.port";
	private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
	private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
	private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";

	private static final String INPUT_KEY_SLICE_LIST = "cassandra.key.slice.list";
	private static final String INPUT_COLUMN_RANGE_LIST = "cassandra.column.range.list";
	private static final String CLUSTER_NAME = "cassandra.cluster.name";
	private static final String HECTOR_CLIENT = "hector.client";
	private static final String CUSTOMIZED_SPLIT = "cassandra.key.split";
	
	
	
	/**
	 * Set the keyspace and column family for the input of this job. Comparator
	 * and Partitioner types will be read from storage-conf.xml.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @param keyspace
	 * @param columnFamily
	 */
	public static void setInputColumnFamily(Configuration conf,
			String keyspace, String columnFamily) {
		if (keyspace == null) {
			throw new UnsupportedOperationException("keyspace may not be null");
		}
		if (columnFamily == null) {
			throw new UnsupportedOperationException(
					"columnfamily may not be null");
		}

		conf.set(INPUT_KEYSPACE_CONFIG, keyspace);
		conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily);
	}

	/**
	 * Set the keyspace and column family for the output of this job.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @param keyspace
	 * @param columnFamily
	 */
	public static void setOutputColumnFamily(Configuration conf,
			String keyspace, String columnFamily) {
		if (keyspace == null) {
			throw new UnsupportedOperationException("keyspace may not be null");
		}
		if (columnFamily == null) {
			throw new UnsupportedOperationException(
					"columnfamily may not be null");
		}

		conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
		conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily);
	}

	/**
	 * The number of rows to request with each get range slices request. Too big
	 * and you can either get timeouts when it takes Cassandra too long to fetch
	 * all the data. Too small and the performance will be eaten up by the
	 * overhead of each request.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @param batchsize
	 *            Number of rows to request each time
	 */
	public static void setRangeBatchSize(Configuration conf, int batchsize) {
		conf.setInt(RANGE_BATCH_SIZE_CONFIG, batchsize);
	}

	/**
	 * The number of rows to request with each get range slices request. Too big
	 * and you can either get timeouts when it takes Cassandra too long to fetch
	 * all the data. Too small and the performance will be eaten up by the
	 * overhead of each request.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @return Number of rows to request each time
	 */
	public static int getRangeBatchSize(Configuration conf) {
		return conf.getInt(RANGE_BATCH_SIZE_CONFIG, DEFAULT_RANGE_BATCH_SIZE);
	}

	/**
	 * Set the size of the input split. This affects the number of maps created,
	 * if the number is too small the overhead of each map will take up the bulk
	 * of the job time.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @param splitsize
	 *            Size of the input split
	 */
	public static void setInputSplitSize(Configuration conf, int splitsize) {
		conf.setInt(INPUT_SPLIT_SIZE_CONFIG, splitsize);
	}

	public static int getInputSplitSize(Configuration conf) {
		return conf.getInt(INPUT_SPLIT_SIZE_CONFIG, DEFAULT_SPLIT_SIZE);
	}

	/**
	 * Set the predicate that determines what columns will be selected from each
	 * row.
	 * 
	 * @param conf
	 *            Job configuration you are about to run
	 * @param predicate
	 */
	public static void setInputSlicePredicate(Configuration conf,
			SlicePredicate predicate) {
		conf.set(INPUT_PREDICATE_CONFIG, predicateToString(predicate));
	}

	public static SlicePredicate getInputSlicePredicate(Configuration conf) {
		return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
	}

	public static String getRawInputSlicePredicate(Configuration conf) {
		return conf.get(INPUT_PREDICATE_CONFIG);
	}

	private static String predicateToString(SlicePredicate predicate) {
		assert predicate != null;
		// this is so awful it's kind of cool!
		TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
		try {
			return FBUtilities.bytesToHex(serializer.serialize(predicate));
		} catch (TException e) {
			throw new RuntimeException(e);
		}
	}

	private static SlicePredicate predicateFromString(String st) {
		assert st != null;
		TDeserializer deserializer = new TDeserializer(
				new TBinaryProtocol.Factory());
		SlicePredicate predicate = new SlicePredicate();
		try {
			deserializer.deserialize(predicate, FBUtilities.hexToBytes(st));
		} catch (TException e) {
			throw new RuntimeException(e);
		}
		return predicate;
	}

	public static String getInputKeyspace(Configuration conf) {
		return conf.get(INPUT_KEYSPACE_CONFIG);
	}

	public static String getOutputKeyspace(Configuration conf) {
		return conf.get(OUTPUT_KEYSPACE_CONFIG);
	}

	public static String getInputKeyspaceUserName(Configuration conf) {
		return conf.get(INPUT_KEYSPACE_USERNAME_CONFIG);
	}

	public static String getInputKeyspacePassword(Configuration conf) {
		return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
	}

	public static String getOutputKeyspaceUserName(Configuration conf) {
		return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG);
	}

	public static String getOutputKeyspacePassword(Configuration conf) {
		return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);
	}

	public static String getInputColumnFamily(Configuration conf) {
		return conf.get(INPUT_COLUMNFAMILY_CONFIG);
	}

	public static String getOutputColumnFamily(Configuration conf) {
		return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
	}

	public static String getReadConsistencyLevel(Configuration conf) {
		return conf.get(READ_CONSISTENCY_LEVEL, "ONE");
	}

	public static String getWriteConsistencyLevel(Configuration conf) {
		return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE");
	}

	public static int getRpcPort(Configuration conf) {
		return Integer.parseInt(conf.get(THRIFT_PORT));
	}

	public static void setRpcPort(Configuration conf, String port) {
		conf.set(THRIFT_PORT, port);
	}

	public static String getInitialAddress(Configuration conf) {
		return conf.get(INITIAL_THRIFT_ADDRESS);
	}

	public static void setInitialAddress(Configuration conf, String address) {
		conf.set(INITIAL_THRIFT_ADDRESS, address);
	}

	public static void setPartitioner(Configuration conf, String classname) {
		conf.set(PARTITIONER_CONFIG, classname);
	}

	public static IPartitioner getPartitioner(Configuration conf) {
		try {
			return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG));
		} catch (ConfigurationException e) {
			throw new RuntimeException(e);
		}
	}
	
	public static void setKeySliceList(Configuration conf, String[] keylist) {
		StringBuilder strBuilder = new StringBuilder();
		for (int i = 0; i < keylist.length; i++) {
			strBuilder.append(keylist[i] + " ");
		}
		conf.set(INPUT_KEY_SLICE_LIST, strBuilder.toString());
	}
	
	public static ByteBuffer[] getKeySliceList(Configuration conf) {
		String[] keylist = conf.get(INPUT_KEY_SLICE_LIST).split(" ");
		ByteBuffer[] list = new ByteBuffer[keylist.length];
		for (int i=0; i<keylist.length; i++) {
			list[i] = ByteBufferUtil.bytes(keylist[i]);
		}
		return list;
	}
	
	public static void setClusterName(Configuration conf, String clusterName) {
		conf.set(CLUSTER_NAME, clusterName);		
	}
	
	public static String getClusterName(Configuration conf) {
		return conf.get(CLUSTER_NAME);
	}
	
	public static void setHectorClientClass(Configuration conf, Class<? extends HectorHadoopClient> cls) {
		conf.setClass(HECTOR_CLIENT, cls, HectorHadoopClient.class);
	}
	
	public static Class<?> getHectorClientClass(Configuration conf) {
		return conf.getClass(HECTOR_CLIENT, HectorHadoopClient.class);
	}
}
